Train Your First MetaSpore Model
MetaSpore is a machine learning platform, which provides a one-stop solution for data preprocessing, model training and online prediction.
In this article, we introduce the basic API of MetaSpore briefly.
Prepare Data
We use the publicly available dataset Terabyte Click Logs published by CriteoLabs as our demo dataset.
We sample the dataset with sampling rate 0.001 so that the running of the demo can finish quickly. More information about the demo dataset can be found in MetaSpore Demo Dataset.
Execute the following cell to download the demo dataset into the working directory. Those data files take up about 2.1 GiB disk space and the downloading process may take sveral minutes. If the downloading fails, please refer to MetaSpore Demo Dataset and download the dataset manually.
import metaspore
metaspore.demo.download_dataset()
You can check the downloaded dataset by executing the following cell.
!ls -l ${PWD}/data/
To upload the dataset to your own s3 bucket:
- Fill
{YOUR_S3_BUCKET}
and{YOUR_S3_PATH}
with your preferred values in the following cell. - Uncomment the cell by removing the leading
#
character. - Execute the cell.
#!aws s3 cp --recursive ${PWD}/data/ s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/data/
Alternatively, you can open a terminal by selecting the File
-> New
-> Terminal
menu item and executing Bash commands in it.
You can check the uploaded dataset in your s3 bucket by uncommenting and executing the following cell.
#!aws s3 ls s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/data/
The schema
directory contains configuration files and must also be uploaded to s3 so that the model can be trained in cluster environment.
#!aws s3 cp --recursive ${PWD}/schema/ s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/schema/
In the rest of the article, we assume the demo dataset has been uploaded to s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/data/
and the schema
directory has been uploaded to s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/schema/
. You should replace {YOUR_S3_BUCKET}
and {YOUR_S3_PATH}
with actual values before executing code cells containing these placeholders.
S3_ROOT_DIR = 's3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/'
Define the Model
We can define our neural network model by subclassing torch.nn.Module
as usual PyTorch models. The following DemoModule
class provides an example.
Compared to usual PyTorch models, the notable difference is the _sparse
layer created by instantiating ms.EmbeddingSumConcat
which takes an embedding size and paths of two text files. ms.EmbeddingSumConcat
makes it possible to define large-scale sparse models in PyTorch, which is a distinguishing feature of MetaSpore.
The _schema_dir
field is an s3 directory which makes it possible to use the DemoModule
class in cluster environment.
import torch
import metaspore as ms
class DemoModule(torch.nn.Module):
def __init__(self):
super().__init__()
self._embedding_size = 16
self._schema_dir = S3_ROOT_DIR + 'demo/schema/'
self._column_name_path = self._schema_dir + 'column_name_demo.txt'
self._combine_schema_path = self._schema_dir + 'combine_schema_demo.txt'
self._sparse = ms.EmbeddingSumConcat(self._embedding_size, self._column_name_path, self._combine_schema_path)
self._sparse.updater = ms.FTRLTensorUpdater()
self._sparse.initializer = ms.NormalTensorInitializer(var=0.01)
self._dense = torch.nn.Sequential(
ms.nn.Normalization(self._sparse.feature_count * self._embedding_size),
torch.nn.Linear(self._sparse.feature_count * self._embedding_size, 1024),
torch.nn.ReLU(),
torch.nn.Linear(1024, 512),
torch.nn.ReLU(),
torch.nn.Linear(512, 1),
)
def forward(self, x):
x = self._sparse(x)
x = self._dense(x)
return torch.sigmoid(x)
Instantiating the DemoModule
class to define our PyTorch model.
module = DemoModule()
Train the Model
To train our model, first we need to create a ms.PyTorchEstimator
passing in several arguments including our PyTorch model module
and the number of workers and servers.
model_out_path
specifies where to store the trained model.
input_label_column_index
specifies the column index of the label column in the dataset, which is 0
for the demo dataset.
model_out_path = S3_ROOT_DIR + 'demo/output/dev/model_out/'
estimator = ms.PyTorchEstimator(module=module,
worker_count=1,
server_count=1,
model_out_path=model_out_path,
experiment_name='0.1',
input_label_column_index=0)
Next, we create a Spark session by calling ms.spark.get_session()
and load the training dataset by call ms.input.read_s3_csv()
.
delimiter
specifies the column delimiter of the dataset, which is the TAB character '\t'
for the demo dataset.
train_dataset_path = S3_ROOT_DIR + 'demo/data/train/day_0_0.001_train.csv'
spark_session = ms.spark.get_session(local=True,
batch_size=100,
worker_count=estimator.worker_count,
server_count=estimator.server_count)
train_dataset = ms.input.read_s3_csv(spark_session, train_dataset_path, delimiter='\t')
Finally, we call the fit()
method of ms.PyTorchEstimator
to train our model. This will take several minutes and you can see the progress by looking at the output of the cell. The trained model is stored in model_out_path
and the model
variable.
model = estimator.fit(train_dataset)
Evaluate the Model
To evaluate our model, we use the ms.input.read_s3_csv()
function again to load the test dataset, passing in the column delimiter '\t'
.
test_dataset_path = S3_ROOT_DIR + 'demo/data/test/day_0_0.001_test.csv'
test_dataset = ms.input.read_s3_csv(spark_session, test_dataset_path, delimiter='\t')
Next, we call the model.transform()
method to transform the test dataset, which will add a column named rawPrediction
to the test dataset representing the predicted labels. For ease of integration with Spark MLlib, model.transform()
will also add a column named label
to the test dataset representing the actual labels.
Like the training process, this will take several minutes and you can see the progress by looking at the output of the cell. The transformed test dataset is stored in the result
variable.
result = model.transform(test_dataset)
result
is a normal PySpark DataFrame and can be inspected by its methods.
result.show(5)
Finally, we use pyspark.ml.evaluation.BinaryClassificationEvaluator
to compute test AUC.
import pyspark
evaluator = pyspark.ml.evaluation.BinaryClassificationEvaluator()
test_auc = evaluator.evaluate(result)
print('test_auc: %g' % test_auc)
When all computations are done, we should call the stop()
method of spark_session
to make sure all the resources are released.
spark_session.stop()
Summary
We illustrated how to train and evaluate neural network model in MetaSpore. Users familiar with PyTorch and Spark MLlib should get started easily, which is the design goal of MetaSpore.